{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "import pyspark\n",
    "from pyspark.conf import SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "import pandas as pd\n",
    "import requests\n",
    "\n",
    "# This CATALOG_URL and MANAGEMENT_URL work for the \"docker compose\" testing and development environment.\n",
    "# Change 'lakekeeper' if you are not running on \"docker compose\" (f. ex. 'localhost' if Lakekeeper is running locally).\n",
    "CATALOG_URL = \"http://lakekeeper:8181/catalog\"\n",
    "MANAGEMENT_URL = \"http://lakekeeper:8181/management\"\n",
    "\n",
    "WAREHOUSE = \"new_warehouse\"\n",
    "CATALOG = \"demo_catalog\"\n",
    "NAMESPACE = \"demo_namespace\"\n",
    "\n",
    "SPARK_VERSION = pyspark.__version__\n",
    "SPARK_MINOR_VERSION = '.'.join(SPARK_VERSION.split('.')[:2])\n",
    "ICEBERG_VERSION = \"1.6.1\""
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Create a new Warehouse"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "response = requests.post(f\"{MANAGEMENT_URL}/v1/warehouse\",\n",
    "              json={\n",
    "                # Name of the new warehouse\n",
    "                \"warehouse-name\": WAREHOUSE,\n",
    "                # Physical location of this warehouse\n",
    "                \"storage-profile\": {\n",
    "                    \"type\": \"s3\",\n",
    "                    \"bucket\": \"examples\",\n",
    "                    \"flavor\": \"s3-compat\", # For AWS Warhouses, use flavor \"aws\"\n",
    "                    # you can change the prefix to something else, f. ex. f\"{WAREHOUSE}\n",
    "                    # as long as it is unique in the bucket\n",
    "                    \"key-prefix\": \"path/to/new-warehouse/\",\n",
    "                    \"assume-role-arn\": None,\n",
    "                    \"endpoint\": \"http://minio:9000\",\n",
    "                    \"region\": \"local-01\",\n",
    "                    \"path-style-access\": True,\n",
    "                    \"sts-enabled\": True\n",
    "                },\n",
    "                # Storage Credentials for the profile specified above.\n",
    "                # These credentials are used to grant clients access to specific files in the storage.\n",
    "                # Clients do not need to know those credentials and will never obtain them directly.\n",
    "                \"storage-credential\": {\n",
    "                    \"type\": \"s3\",\n",
    "                    \"credential-type\": \"access-key\",\n",
    "                    \"aws-access-key-id\": \"minio-root-user\",\n",
    "                    \"aws-secret-access-key\": \"minio-root-password\"\n",
    "                }\n",
    "            })\n",
    "print(f\"{response.status_code}: {response.reason}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "# As warehouse names must be unique inside a project, creating the same warehouse again would fail"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "# Connect with Spark"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "config = {\n",
    "    f\"spark.sql.catalog.{CATALOG}\": \"org.apache.iceberg.spark.SparkCatalog\",\n",
    "    f\"spark.sql.catalog.{CATALOG}.type\": \"rest\",\n",
    "    f\"spark.sql.catalog.{CATALOG}.uri\": CATALOG_URL,\n",
    "    f\"spark.sql.catalog.{CATALOG}.warehouse\": f\"{WAREHOUSE}\",\n",
    "    f\"spark.sql.catalog.{CATALOG}.io-impl\": \"org.apache.iceberg.aws.s3.S3FileIO\",\n",
    "    \"spark.sql.extensions\": \"org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions\",\n",
    "    f\"spark.sql.defaultCatalog\": CATALOG,\n",
    "    \"spark.jars.packages\": f\"org.apache.iceberg:iceberg-spark-runtime-{SPARK_MINOR_VERSION}_2.12:{ICEBERG_VERSION},org.apache.iceberg:iceberg-aws-bundle:{ICEBERG_VERSION}\",\n",
    "}"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark_config = SparkConf().setMaster('local').setAppName(\"Iceberg-REST\")\n",
    "for k, v in config.items():\n",
    "    spark_config = spark_config.set(k, v)\n",
    "\n",
    "spark = SparkSession.builder.config(conf=spark_config).getOrCreate()\n",
    "\n",
    "spark.sql(f\"USE {CATALOG}\")"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(f\"CREATE NAMESPACE IF NOT EXISTS {NAMESPACE}\")\n",
    "spark.sql(\"SHOW NAMESPACES\").toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "data = pd.DataFrame([[1, 'a-string', 2.2]], columns=['id', 'strings', 'floats'])\n",
    "sdf = spark.createDataFrame(data)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "sdf.writeTo(f\"{NAMESPACE}.my_table\").createOrReplace()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": [
    "spark.sql(f\"SELECT * FROM {NAMESPACE}.my_table\").toPandas()"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {},
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3 (ipykernel)",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.11.10"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 4
}
